package org.dhacky

/**
  * MIT License
  *
  * Copyright (c) 2019 DHacky
  */

import java.lang.{Double => JDouble}
import java.sql.DriverManager

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.types.Row

object JdbcUtil {
  val DRIVER_CLASS: String = "ru.yandex.clickhouse.ClickHouseDriver"
  val DB_URL = "jdbc:clickhouse://localhost:8123/"
  val DB_USER = "default"
  val DB_PASSWORD = ""
  val DB_NAME = "flinkDB"
  val INPUT_TABLE = "inputTB"
  val OUTPUT_TABLE = "outputTB"
  val ROW_TYPE_INFO = new RowTypeInfo(
    BasicTypeInfo.INT_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.DOUBLE_TYPE_INFO,
    BasicTypeInfo.DATE_TYPE_INFO)

  @throws[Exception]
  def prepareDatabase(): Unit = {
    Class.forName(DRIVER_CLASS)
    val conn = DriverManager.getConnection(DB_URL)
    try {
      val stat = conn.createStatement
      //create database and tables
      stat.executeUpdate("CREATE DATABASE IF NOT EXISTS " + DB_NAME)
      stat.executeUpdate(getCreateQuery(INPUT_TABLE))
      stat.executeUpdate(getCreateQuery(OUTPUT_TABLE))
      // insert data into INPUT_TABLE
      stat.execute(getInsertState)
      stat.close()
    } finally if (conn != null) conn.close()
  }

  @throws[Exception]
  def cleanUpDatabase(): Unit = {
    Class.forName(DRIVER_CLASS)
    val conn = DriverManager.getConnection(DB_URL)
    try {
      val stat = conn.createStatement
      stat.executeUpdate(getDropQuery(INPUT_TABLE))
      stat.executeUpdate(getDropQuery(OUTPUT_TABLE))
      stat.executeUpdate("DROP DATABASE " + DB_NAME)
      stat.close()
    } finally if (conn != null) conn.close()
  }

  def getSelectAll(tableName: String): String = s"SELECT * FROM $DB_NAME.$tableName"

  def getInsertTemplate: String = s"INSERT INTO $DB_NAME.$OUTPUT_TABLE VALUES (?,?,?,?,?)"

  def getInsertState: String = {
    val sql = new StringBuilder(s"INSERT INTO $DB_NAME.$INPUT_TABLE VALUES ")
    for (row <- DATA) {
      sql.append("(").append(row.getField(0)).append(",'")
        .append(row.getField(1)).append("','")
        .append(row.getField(2)).append("',")
        .append(row.getField(3)).append(",'")
        .append(row.getField(4)).append("')").append(",")
    }
    sql.toString
  }

  def getCreateQuery(tableName: String): String =
    s"""CREATE TABLE IF NOT EXISTS $DB_NAME.$tableName
       |(id UInt64,name String,email String,ratio Float32,date Date)
       |ENGINE = MergeTree() PARTITION BY toYYYYMM(date)
       |ORDER BY (id, name, email) SETTINGS index_granularity = 8192"""
      .stripMargin.replaceAll("\n", " ")

  def getDropQuery(tableName: String): String = s"DROP TABLE $DB_NAME.$tableName"

  def checkOutputDB(): Unit = {
    val dbConn = DriverManager.getConnection(DB_URL)
    val statement = dbConn.prepareStatement(getSelectAll(OUTPUT_TABLE))
    val resultSet = statement.executeQuery
    try {
      var count = 0
      while ( {
        resultSet.next
      }) count += 1
      if (count != DATA.length) {
        Console.err.println(s"Table $OUTPUT_TABLE has $count rows, but data length is ${DATA.length}")
      }
    } finally {
      if (dbConn != null) dbConn.close()
      if (statement != null) statement.close()
      if (resultSet != null) resultSet.close()
    }
  }

  // data generated by https://mockaroo.com/
  private val DATA = Array(
    Row.of(new Integer(1), "Addy", "amcgeagh0@berkeley.edu", new JDouble(0.4), "2018-09-28"),
    Row.of(new Integer(2), "Kristos", "kmcshee1@salon.com", new JDouble(0.9), "2018-10-04"),
    Row.of(new Integer(3), "Teodorico", "tsked2@google.fr", new JDouble(0.8), "2018-12-09"),
    Row.of(new Integer(4), "Emilie", "elory3@vistaprint.com", new JDouble(0.5), "2018-08-26"),
    Row.of(new Integer(5), "Lyndy", "lmeiklejohn4@foxnews.com", new JDouble(0.3), "2018-04-11"),
    Row.of(new Integer(6), "Delores", "dcuxson5@netlog.com", new JDouble(0.1), "2018-08-10"),
    Row.of(new Integer(7), "Terri", "tmintoff6@godaddy.com", new JDouble(0.1), "2018-10-01"),
    Row.of(new Integer(8), "Dietrich", "djantot7@skyrock.com", new JDouble(0.5), "2018-09-23"),
    Row.of(new Integer(9), "Ilka", "ifaulconbridge8@chron.com", new JDouble(0.4), "2019-01-25"),
    Row.of(new Integer(10), "Baldwin", "btocknell9@biblegateway.com", new JDouble(0.4), "2018-03-16"),
    Row.of(new Integer(11), "Dwight", "dkelloga@indiegogo.com", new JDouble(0.7), "2018-07-28"),
    Row.of(new Integer(12), "Haydon", "hoagb@google.de", new JDouble(0.6), "2018-03-22"),
    Row.of(new Integer(13), "Hartley", "hdolemanc@nbcnews.com", new JDouble(0.9), "2018-11-11"),
    Row.of(new Integer(14), "Tab", "tshirdd@photobucket.com", new JDouble(0.1), "2019-02-21"),
    Row.of(new Integer(15), "Carlynn", "cchrismase@multiply.com", new JDouble(0.8), "2018-08-04"),
    Row.of(new Integer(16), "Ossie", "olacazef@time.com", new JDouble(0.1), "2018-03-16"),
    Row.of(new Integer(17), "Caria", "cbowlerg@wordpress.org", new JDouble(0.1), "2018-11-23"),
    Row.of(new Integer(18), "Charity", "cclaringboldh@digg.com", new JDouble(0.4), "2018-09-21"),
    Row.of(new Integer(19), "Lesly", "ljillingsi@marriott.com", new JDouble(0.2), "2018-11-13"),
    Row.of(new Integer(20), "Derick", "dblebyj@t.co", new JDouble(0.2), "2018-05-13"))
}
